package com.itcast.hadoop.llyy.enhance;/**
 * Created by Administrator on 2019/4/22 0022.
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author ydf
 * @com kt
 * @create 2019-04-22 下午 3:44
 *
 * 自定义输出 OutputFormat
 **/
public class LogEnhanceOutputFormat<K, V> extends FileOutputFormat<K, V> {

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/enhancedLog"));
        FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/tocrawl"));

        return new LogEnhanceRecordWriter<K, V>(enhancedOs,tocrawlOs);
    }

    public static class LogEnhanceRecordWriter<K, V> extends RecordWriter<K, V> {
        private FSDataOutputStream enhancedOs = null;
        private FSDataOutputStream tocrawlOs = null;
        //通过构造函数传入流
        public LogEnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
            this.enhancedOs = enhancedOs;
            this.tocrawlOs = tocrawlOs;
        }

        @Override
        public void write(K k, V v) throws IOException, InterruptedException {
//            将文件流放在write方法中每来一个k，v就打开一次，重复打开
//            FileSystem fs = FileSystem.get(new Configuration());
//            FSDataOutputStream enhancedOs = fs.create(new Path("/liuliang/enhancedLog"));
//            FSDataOutputStream tocrawlOs = fs.create(new Path("/liuliang/tocrawl"));
            if (k.toString().contains("tocrawl")) {
//            if (k.toString().endsWith("tocrawl")) {  //结尾有/r/n
                tocrawlOs.write(k.toString().getBytes());
            } else {
                enhancedOs.write(k.toString().getBytes());
            }
        }

        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

            if(enhancedOs !=null){
                enhancedOs.close();
            }
            if (tocrawlOs!= null){
                tocrawlOs.close();
            }
        }
    }

}
